package org.voovan.korla.handler;

import org.voovan.korla.KorlaStatic;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.korla.socket.KorlaProvider;
import org.voovan.network.EventProcess;
import org.voovan.network.EventTrigger;
import org.voovan.network.IoSession;
import org.voovan.network.exception.SendMessageException;
import org.voovan.tools.event.EventRunner;
import org.voovan.tools.event.EventRunnerGroup;
import org.voovan.tools.log.Logger;
import org.voovan.tools.log.Message;

import java.util.concurrent.ConcurrentHashMap;

import static org.voovan.korla.KorlaStatic.getChannel;


/**
 * 服务类业务处理句柄
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class ProviderHandler implements Handler {
    private static EventRunnerGroup EVENT_RUNNER_GROUP = EventRunnerGroup.newInstance("ProviderEventRunner", KorlaStatic.PROVIDER_RUNNER_COUNT, false, 5, null);
    static {
        EVENT_RUNNER_GROUP.process();
    }

    //<Channel, CallBack>
    private ConcurrentHashMap<String, Callback> channelCallback = new ConcurrentHashMap<String, Callback>();

    private KorlaProvider korlaProvider;

    public ProviderHandler(KorlaProvider korlaProvider) {
        this.korlaProvider = korlaProvider;
    }

    public void addCallback(String channel, Callback callback) {
        channelCallback.put(channel, callback);
    }

    public Callback getCallback(String channel) {
        return channelCallback.get(channel);
    }

    @Override
    public Msg connect(IoSession session) {
        String remoteAddress = session.remoteAddress();
        if(korlaProvider.getAllowAddress().contains(remoteAddress)) {
            //缓存 session
            String channel = getChannel(session);
            if(channel!=null) {
                korlaProvider.bindSession(channel, session);
            } else {
                Logger.warn("[Korla] ProviderHandler.connect error, channel is null, socket will be close");
                session.close();
            }
        } else {
            session.close();
            Logger.error("Deny address " + remoteAddress + "  to connect");
        }
        return null;
    }

    @Override
    public void disconnect(IoSession session) {
        String channel = getChannel(session);
        if(channel!=null) {
            korlaProvider.sessionCaches.get(channel).remove(session);
        }
    }

    @Override
    public Msg receive(IoSession session, Msg msg) {
        EventRunner eventRunnerGroup = EVENT_RUNNER_GROUP.choseEventRunner();

        //选取执行线程
        try {
            EventRunner eventRunner = EVENT_RUNNER_GROUP.getEventRunners()[calcSliceIndex(msg.getSliceKey())];

            int remainSize = session.getReadByteBufferChannel().size();

            Runnable runnable = () -> {
                //调用 KorlaProvicer 的 callback
                Msg retMsg = channelCallback.get(msg.getChannel()).execute(msg);
                EventProcess.sendMessage(session, retMsg);

                //判断是否预处理完所有的请求字节, 用于批量发送
                if (remainSize == 0) {
                    eventRunner.addEvent(() -> session.flush());
                }
            };

            eventRunner.addEvent(5, runnable);
        } catch (Exception e) {
            e.printStackTrace();
        }


        return null;
    }

    /**
     * 使用分片算法选取执行线程
     * @param sliceKey 分派用到的 key
     * @return 分片的 id
     */
    public int calcSliceIndex(Object sliceKey) {
        //调用分片算法分片
        Integer sliceIndex = null;
        if(korlaProvider.getSliceAlgorithm()!=null) {
            sliceIndex = korlaProvider.getSliceAlgorithm().apply(sliceKey);
        }

        //计算分片索引
        if(sliceIndex!=null) {
            sliceIndex = sliceIndex%KorlaStatic.PROVIDER_RUNNER_COUNT;
        } else {
            //这里使用线程 id 是因为在上层框架中会话是和线程绑定的
            //与session.socketSelector().getEventRunner().getThread().getId() 等价
            sliceIndex = (int)(Thread.currentThread().getId()%KorlaStatic.PROVIDER_RUNNER_COUNT);
        }

        //如果用户设置的分片数大于了系统默认的分片数
        if(sliceIndex >= KorlaStatic.PROVIDER_RUNNER_COUNT) {
            sliceIndex = sliceIndex%KorlaStatic.PROVIDER_RUNNER_COUNT;
        }

        sliceIndex = Math.abs(sliceIndex);
        return sliceIndex;
    }

    @Override
    public void exception(IoSession session, Exception e) {
        Logger.errorf("ProviderHandler {} error", e, session.toString());
    }
}
